VARMAX (VARMA with eXogenous inputs)#
Goals#
Precisely define VARX and VARMAX, and how they extend VAR/VARMA.
Build intuition for exogenous inputs as known drivers / control signals.
Understand complexity and identifiability pitfalls (exogeneity, collinearity, MA issues).
Implement simulation + VARX fitting + forecasting + shock propagation in NumPy.
Visualize multivariate forecasts and shock propagation with Plotly.
Prerequisites#
VAR/VARMA basics (previous folder)
Linear regression / OLS
1) Definitions: VARX vs VARMAX#
Let (\mathbf{y}_t\in\mathbb{R}^k) be the target vector and (\mathbf{x}_t\in\mathbb{R}^m) be a vector of exogenous regressors (known inputs).
VARX(p,r)#
A VARX (sometimes written VAR(p)+X) extends VAR with exogenous lags:
VARMAX(p,q,r)#
A VARMAX is the full ARMA + X model:
Transfer-function (system) view#
Using lag polynomials
the model becomes
So VARMAX is a multi-input multi-output (MIMO) linear system:
(A(L)^{-1}B(L)): response of outputs to inputs
(A(L)^{-1}M(L)): response of outputs to shocks
2) What “exogenous” really requires#
For OLS-style estimation of VARX, a common assumption is strict exogeneity:
If (\mathbf{x}_t) is correlated with (\varepsilon_t) (simultaneity, omitted variables, feedback), then naive OLS is biased.
Practical examples of valid-ish (\mathbf{x}_t):
calendar effects / holidays
known interventions (pricing change, policy shock)
lagged covariates measured without error
Risky (\mathbf{x}_t): contemporaneous variables influenced by (\mathbf{y}_t) itself.
3) Complexity + identifiability pitfalls#
Parameter count (ignoring (\Sigma)) for VARMAX(p,q,r):
Pitfalls:
Collinearity: (\mathbf{x}{t-\ell}) can be highly correlated with each other and with (\mathbf{y}{t-i}).
Exogeneity violations: if inputs are not exogenous, estimates can be biased.
MA identifiability: all VARMA issues still apply (invertibility, cancellation, non-unique parameterizations).
Tradeoff intuition:
Adding (\mathbf{x}_t) can reduce unexplained variance and improve interpretability.
Adding an MA part can fix autocorrelated residuals, but estimation becomes substantially harder.
4) Intuition with abstract systems#
Think of VARMAX as a linear dynamical system with:
State feedback: (A_i) couples past outputs into the present.
Control / forcing inputs: (B_\ell) injects known signals (\mathbf{x}) (with possible delays).
Disturbances: (\varepsilon_t) are random shocks.
Shock filter: (M_j) lets shocks persist briefly (colored noise).
Two useful “shock propagation” objects:
Innovation impulse response (\Psi_h): effect of a one-time shock in (\varepsilon_t) on future (\mathbf{y}).
Dynamic multipliers (\Theta_h): effect of a one-time pulse in (\mathbf{x}_t) on future (\mathbf{y}).
When (q=0) (VARX), (\Theta_h) satisfies a simple recursion (see code).
import sys
import numpy as np
import plotly
import plotly.graph_objects as go
import os
import plotly.io as pio
from plotly.subplots import make_subplots
pio.templates.default = "plotly_white"
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
np.set_printoptions(precision=4, suppress=True)
SEED = 7
rng = np.random.default_rng(SEED)
print("Python:", sys.version.split()[0])
print("NumPy:", np.__version__)
print("Plotly:", plotly.__version__)
Python: 3.12.9
NumPy: 1.26.2
Plotly: 6.5.2
def simulate_varmax(
x: np.ndarray,
A_list: list[np.ndarray],
B_list: list[np.ndarray],
M_list: list[np.ndarray] | None = None,
c: np.ndarray | None = None,
Sigma: np.ndarray | None = None,
burnin: int = 200,
seed: int = 0,
) -> tuple[np.ndarray, np.ndarray]:
"""Simulate VARMAX(p,q,r) with provided exogenous series x.
y_t = c + sum_i A_i y_{t-i} + sum_l B_l x_{t-l} + e_t + sum_j M_j e_{t-j}
Args:
x: (T, m) exogenous inputs. Returned y will have length (T - burnin - max_lag).
B_list: [B0, B1, ..., Br] where B_l is (k,m)
M_list: [M1, ..., Mq] where M_j is (k,k)
Returns:
y: (n, k)
e: (n, k)
"""
rng_local = np.random.default_rng(seed)
x = np.asarray(x, dtype=float)
if x.ndim != 2:
raise ValueError("x must be (T,m)")
T_x, m = x.shape
p = len(A_list)
r = len(B_list) - 1
q = 0 if (M_list is None) else len(M_list)
M_list = [] if M_list is None else M_list
if p == 0 and r < 0 and q == 0:
raise ValueError("Need at least one of p, r, q")
k = (A_list[0].shape[0] if p > 0 else B_list[0].shape[0])
for A in A_list:
if A.shape != (k, k):
raise ValueError("All A_i must be (k,k)")
for B in B_list:
if B.shape != (k, m):
raise ValueError("All B_l must be (k,m)")
for M in M_list:
if M.shape != (k, k):
raise ValueError("All M_j must be (k,k)")
if c is None:
c = np.zeros(k)
c = np.asarray(c, dtype=float)
if c.shape != (k,):
raise ValueError("c must be (k,)")
if Sigma is None:
Sigma = np.eye(k)
Sigma = np.asarray(Sigma, dtype=float)
if Sigma.shape != (k, k):
raise ValueError("Sigma must be (k,k)")
max_lag = max(p, r, q)
if T_x <= burnin + max_lag:
raise ValueError("x is too short for chosen lags + burnin")
L = np.linalg.cholesky(Sigma)
e = rng_local.standard_normal((T_x, k)) @ L.T
y = np.zeros((T_x, k), dtype=float)
for t in range(max_lag, T_x):
yt = c.copy()
for i, A in enumerate(A_list, start=1):
yt += A @ y[t - i]
for ell, B in enumerate(B_list):
yt += B @ x[t - ell]
yt += e[t]
for j, M in enumerate(M_list, start=1):
yt += M @ e[t - j]
y[t] = yt
sl = slice(burnin + max_lag, T_x)
return y[sl], e[sl]
def varx_ols_fit(
y: np.ndarray,
x: np.ndarray,
p: int,
r: int,
add_intercept: bool = True,
) -> dict:
"""Fit VARX(p,r) by stacked OLS.
y_t = c + sum_i A_i y_{t-i} + sum_{ell=0}^r B_ell x_{t-ell} + e_t
Returns c, A_list, B_list, residuals, Sigma_hat.
"""
y = np.asarray(y, dtype=float)
x = np.asarray(x, dtype=float)
if y.ndim != 2 or x.ndim != 2:
raise ValueError("y and x must be 2D")
T, k = y.shape
Tx, m = x.shape
if Tx != T:
raise ValueError("x and y must have the same length")
max_lag = max(p, r)
if T <= max_lag:
raise ValueError("Need T > max(p,r)")
Y = y[max_lag:]
X_blocks = []
if add_intercept:
X_blocks.append(np.ones((T - max_lag, 1)))
for lag in range(1, p + 1):
X_blocks.append(y[max_lag - lag : T - lag])
for lag in range(0, r + 1):
X_blocks.append(x[max_lag - lag : T - lag])
X = np.concatenate(X_blocks, axis=1)
B_hat, *_ = np.linalg.lstsq(X, Y, rcond=None)
Y_hat = X @ B_hat
E = Y - Y_hat
offset = 0
c_hat = np.zeros(k)
if add_intercept:
c_hat = B_hat[0]
offset = 1
A_hat = []
for i in range(p):
A_hat.append(B_hat[offset + i * k : offset + (i + 1) * k].T)
offset2 = offset + p * k
B_list = []
for ell in range(r + 1):
block = B_hat[offset2 + ell * m : offset2 + (ell + 1) * m].T
B_list.append(block)
return {
'c': c_hat,
'A': A_hat,
'B': B_list,
'resid': E,
'Sigma': (E.T @ E) / (T - max_lag),
'X': X,
'Y': Y,
}
def varx_forecast_paths(
y_hist: np.ndarray,
x_hist: np.ndarray,
x_future: np.ndarray,
c: np.ndarray,
A_list: list[np.ndarray],
B_list: list[np.ndarray],
Sigma: np.ndarray,
n_paths: int = 500,
seed: int = 0,
) -> np.ndarray:
"""Monte Carlo forecasts for VARX with known future x.
Returns paths: (n_paths, H, k)
"""
rng_local = np.random.default_rng(seed)
y_hist = np.asarray(y_hist, dtype=float)
x_hist = np.asarray(x_hist, dtype=float)
x_future = np.asarray(x_future, dtype=float)
T, k = y_hist.shape
Tx, m = x_hist.shape
if Tx != T:
raise ValueError("x_hist and y_hist must match")
H, m2 = x_future.shape
if m2 != m:
raise ValueError("x_future must have same m as x_hist")
p = len(A_list)
r = len(B_list) - 1
max_lag = max(p, r)
if T < max_lag:
raise ValueError("Need enough history")
L = np.linalg.cholesky(Sigma)
# stitch x into one array for lag lookup
x_ext = np.vstack([x_hist, x_future])
paths = np.zeros((n_paths, H, k), dtype=float)
for s in range(n_paths):
y_ext = np.zeros((T + H, k), dtype=float)
y_ext[:T] = y_hist
e_fut = rng_local.standard_normal((H, k)) @ L.T
for t in range(T, T + H):
yt = c.copy()
for i, A in enumerate(A_list, start=1):
yt += A @ y_ext[t - i]
for ell, B in enumerate(B_list):
yt += B @ x_ext[t - ell]
yt += e_fut[t - T]
y_ext[t] = yt
paths[s] = y_ext[T:]
return paths
def varx_dynamic_multiplier(A_list: list[np.ndarray], B_list: list[np.ndarray], steps: int) -> np.ndarray:
"""Theta_0..Theta_steps where y_{t+h} response to a unit pulse in x_t.
For VARX: y_t = sum_i A_i y_{t-i} + sum_ell B_ell x_{t-ell}.
Recursion:
Theta_h = B_h + sum_{i=1}^{min(p,h)} A_i Theta_{h-i}
with B_h = 0 for h>r.
Returns: (steps+1, k, m)
"""
p = len(A_list)
r = len(B_list) - 1
k, m = B_list[0].shape
Theta = np.zeros((steps + 1, k, m))
for h in range(0, steps + 1):
Bh = B_list[h] if h <= r else np.zeros((k, m))
acc = Bh.copy()
for i in range(1, min(p, h) + 1):
acc += A_list[i - 1] @ Theta[h - i]
Theta[h] = acc
return Theta
def varma_irf(A_list: list[np.ndarray], M_list: list[np.ndarray], steps: int) -> np.ndarray:
"""Impulse response matrices Psi_0..Psi_steps for VARMA(p,q)."""
p = len(A_list)
q = len(M_list)
k = (A_list[0].shape[0] if p > 0 else M_list[0].shape[0])
Psi = np.zeros((steps + 1, k, k))
Psi[0] = np.eye(k)
for h in range(1, steps + 1):
Mh = M_list[h - 1] if (1 <= h <= q) else np.zeros((k, k))
acc = Mh.copy()
for i in range(1, min(p, h) + 1):
acc += A_list[i - 1] @ Psi[h - i]
Psi[h] = acc
return Psi
5) Synthetic example: VARMAX dynamics (simulate) + VARX estimation baseline#
We simulate a 2D output (\mathbf{y}_t) driven by:
its own lags (VAR part),
a 1D exogenous signal (x_t) (input),
and a short MA(1) disturbance term.
Then we fit a VARX (no MA) by OLS as a baseline.
# Exogenous input: a smooth driver + an intervention step
T = 900
t = np.arange(T)
x = np.zeros((T, 1))
x[:, 0] = 0.6 * np.sin(2 * np.pi * t / 60) + 0.25 * np.sin(2 * np.pi * t / 15)
# intervention at t=600
x[t >= 600, 0] += 1.0
# VAR part (stable)
A1 = np.array([[0.55, 0.10],
[-0.05, 0.35]])
A2 = np.array([[-0.20, 0.03],
[0.01, -0.12]])
A = [A1, A2]
# Exogenous lags: B0 (instant), B1 (delayed)
B0 = np.array([[0.50],
[0.10]])
B1 = np.array([[0.15],
[0.25]])
B = [B0, B1]
# MA(1) disturbance
M1 = np.array([[0.45, 0.00],
[0.20, 0.20]])
M = [M1]
c = np.array([0.0, 0.0])
Sigma = np.array([[0.8, 0.25],
[0.25, 0.6]])
# Simulate
Y, E = simulate_varmax(x=x, A_list=A, B_list=B, M_list=M, c=c, Sigma=Sigma, burnin=250, seed=SEED)
X = x[250 + max(len(A), len(B)-1, len(M)) :]
print("Y:", Y.shape, "X:", X.shape)
# Plot y and x
T_eff = Y.shape[0]
t_eff = np.arange(T_eff)
fig = make_subplots(
rows=3,
cols=1,
shared_xaxes=True,
vertical_spacing=0.05,
subplot_titles=["y[0]", "y[1]", "x (exogenous)"]
)
fig.add_trace(go.Scatter(x=t_eff, y=Y[:, 0], mode="lines", name="y0"), row=1, col=1)
fig.add_trace(go.Scatter(x=t_eff, y=Y[:, 1], mode="lines", name="y1"), row=2, col=1)
fig.add_trace(go.Scatter(x=t_eff, y=X[:, 0], mode="lines", name="x"), row=3, col=1)
fig.update_layout(height=620, title="Simulated VARMAX outputs driven by exogenous input")
fig.show()
Y: (648, 2) X: (648, 1)
Fit a VARX baseline (ignoring MA)#
When (q>0) the residuals from a VARX fit can remain autocorrelated (because the true disturbance is MA). Still, VARX is often a useful baseline.
fit = varx_ols_fit(y=Y, x=X, p=2, r=1, add_intercept=True)
print("c_hat:", fit["c"])
print("B0_hat:")
print(fit["B"][0])
print("B1_hat:")
print(fit["B"][1])
print("Sigma_hat:")
print(fit["Sigma"])
c_hat: [ 0.0881 -0.0713]
B0_hat:
[[-0.4179]
[ 0.0591]]
B1_hat:
[[0.8749]
[0.2066]]
Sigma_hat:
[[0.7877 0.2877]
[0.2877 0.6314]]
6) Multivariate forecasts with known future inputs#
Forecasting with exogenous inputs is typically scenario-based:
decide or forecast (x_{T+1:T+H}),
propagate the model forward.
Below we forecast (H) steps ahead using the fitted VARX parameters.
T0 = 520
H = 80
y_hist = Y[:T0]
x_hist = X[:T0]
y_true = Y[T0:T0+H]
x_future = X[T0:T0+H]
paths = varx_forecast_paths(
y_hist=y_hist,
x_hist=x_hist,
x_future=x_future,
c=fit['c'],
A_list=fit['A'],
B_list=fit['B'],
Sigma=fit['Sigma'],
n_paths=800,
seed=SEED + 100,
)
q_lo, q_med, q_hi = np.quantile(paths, [0.05, 0.5, 0.95], axis=0)
x_past = np.arange(T0)
x_fut = np.arange(T0, T0 + H)
fig = make_subplots(rows=3, cols=1, shared_xaxes=True, vertical_spacing=0.05,
subplot_titles=["Forecast y[0]", "Forecast y[1]", "x future (given)"])
for i in range(2):
fig.add_trace(go.Scatter(x=x_past, y=y_hist[:, i], mode="lines", name=f"y{i} history",
line=dict(color="rgba(0,0,0,0.55)")), row=i+1, col=1)
fig.add_trace(go.Scatter(x=x_fut, y=y_true[:, i], mode="lines", name=f"y{i} truth",
line=dict(color="rgba(0,0,0,1.0)", width=2)), row=i+1, col=1)
fig.add_trace(go.Scatter(x=x_fut, y=q_hi[:, i], mode="lines", line=dict(width=0),
showlegend=False, hoverinfo="skip"), row=i+1, col=1)
fig.add_trace(go.Scatter(x=x_fut, y=q_lo[:, i], mode="lines", line=dict(width=0),
fill="tonexty", fillcolor="rgba(0,160,120,0.18)",
name=f"90% PI y{i}", hoverinfo="skip"), row=i+1, col=1)
fig.add_trace(go.Scatter(x=x_fut, y=q_med[:, i], mode="lines", name=f"median y{i}",
line=dict(color="rgba(0,160,120,1.0)", dash="dash")), row=i+1, col=1)
fig.add_vline(x=T0, line_width=1, line_dash="dot", line_color="gray")
fig.add_trace(go.Scatter(x=x_fut, y=x_future[:, 0], mode="lines", name="x given",
line=dict(color="rgba(120,120,120,1.0)")), row=3, col=1)
fig.update_layout(height=700, title="VARX forecasts conditional on exogenous path")
fig.show()
7) Shock propagation#
Two kinds:
Innovation shocks (\varepsilon_t): use impulse responses (\Psi_h) (same recursion as VARMA).
Input shocks (x_t): use dynamic multipliers (\Theta_h) for VARX.
Below we show:
response to a unit pulse in the input at time 0 (dynamic multipliers, using the fitted VARX),
response to a 1-s.d. innovation shock (impulse responses, using the simulated VARMA disturbance).
# (1) Dynamic multipliers: response to a unit pulse in x
H_mul = 25
Theta = varx_dynamic_multiplier(A_list=fit['A'], B_list=fit['B'], steps=H_mul) # (H+1,k,m)
h = np.arange(H_mul + 1)
resp = Theta[:, :, 0] # m=1
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.08,
subplot_titles=["y[0] response to x pulse", "y[1] response to x pulse"])
fig.add_trace(go.Scatter(x=h, y=resp[:, 0], mode="lines", name="y0"), row=1, col=1)
fig.add_trace(go.Scatter(x=h, y=resp[:, 1], mode="lines", name="y1"), row=2, col=1)
fig.update_layout(height=520, title="VARX shock propagation from exogenous input (dynamic multipliers)")
fig.update_xaxes(title_text="horizon")
fig.show()
# (2) Innovation impulse responses: response to 1-s.d. shocks in eps
H_irf = 25
Psi = varma_irf(A_list=A, M_list=M, steps=H_irf) # uses the simulated VARMAX disturbance
shock_std = np.sqrt(np.diag(Sigma))
fig = make_subplots(
rows=2,
cols=2,
shared_xaxes=True,
shared_yaxes=False,
horizontal_spacing=0.12,
vertical_spacing=0.10,
subplot_titles=[
"response y0 to shock e0",
"response y0 to shock e1",
"response y1 to shock e0",
"response y1 to shock e1",
],
)
h = np.arange(H_irf + 1)
for shock_j in range(2):
u = np.zeros(2)
u[shock_j] = shock_std[shock_j]
resp = Psi @ u # (H+1, k)
fig.add_trace(go.Scatter(x=h, y=resp[:, 0], mode="lines", name=f"shock {shock_j}"), row=1, col=shock_j+1)
fig.add_trace(go.Scatter(x=h, y=resp[:, 1], mode="lines", showlegend=False), row=2, col=shock_j+1)
fig.update_layout(height=520, title="VARMAX impulse responses (1-s.d. innovations)")
fig.update_xaxes(title_text="horizon")
fig.show()
8) Exercises#
Change (B_0) and (B_1) signs and interpret the dynamic multiplier shapes.
Add more exogenous lags (increase (r)) and see how (\Theta_h) changes.
Simulate with stronger MA(1) and compare VARX residual behavior.
References#
Lütkepohl, New Introduction to Multiple Time Series Analysis
Hamilton, Time Series Analysis
Tsay, Multivariate Time Series Analysis